package cn.lingyangwl.framework.mqtt.config;

import cn.lingyangwl.framework.tool.core.StringUtils;
import cn.lingyangwl.framework.tool.core.UUIDUtils;
import cn.lingyangwl.framework.mq.base.enums.QosEnum;
import cn.lingyangwl.framework.mqtt.listener.IMqttListener;
import cn.lingyangwl.framework.mqtt.listener.MqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author shenguangyang
 */
@Configuration
public class MqttConsumerConfig {
    private static final Logger log = LoggerFactory.getLogger(MqttConsumerConfig.class);
    @Resource
    private MqttProperties mqttProperties;

    @Resource
    private ApplicationContext applicationContext;

    /**
     * 动态添加监听的话题
     * <p>
     * // 添加一个或多个监听Topic
     * adapter.addTopic("topic1"); // 默认qos为1
     * adapter.addTopic("topic2", 1);
     * adapter.addTopic("topic3", "topic4");
     * adapter.addTopics(new String[]{"topic5", "topic6"},new int[]{1, 1});
     * // 删除一个或多个监听Topic
     * adapter.removeTopic("topic1");
     * adapter.removeTopic("topic2", "topic3")
     */
    public MqttPahoMessageDrivenChannelAdapter adapter;

    private static final Map<String, IMqttListener> listenerMap = new ConcurrentHashMap<>();
    /**
     * 订阅的bean名称
     */
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";

    private final MqttConnectConfig mqttConnectConfig;

    public MqttConsumerConfig() {
        mqttConnectConfig = new MqttConnectConfig();
    }

    /**
     * 收集监听者的所有回调类
     */
    private List<IMqttListener> collectMqttMessageListener() {
        List<IMqttListener> mqttListeners = new ArrayList<>();
        // 获取继承了settingsCrudBiz的所有子类
        Map<String, IMqttListener> settings = applicationContext.getBeansOfType(IMqttListener.class);
        settings.forEach((key, val) -> {
            //遍历执行
            mqttListeners.add(val);
        });
        return mqttListeners;
    }

    private void doListenerConfig(MqttPahoMessageDrivenChannelAdapter adapter) {
        List<IMqttListener> mqttMessageListeners = collectMqttMessageListener();
        for (IMqttListener iMqttListener : mqttMessageListeners) {
            MqttMessageListener mqttMessageListener = iMqttListener.getClass().getAnnotation(MqttMessageListener.class);
            if (mqttMessageListener == null) {
                throw new RuntimeException(iMqttListener.getClass().getSimpleName() + " not MqttMessageListener annotation");
            }

            QosEnum[] qosEnums = mqttMessageListener.qos();
            String[] topics = mqttMessageListener.topic();
            for (String topic : topics) {
                listenerMap.put(topic, iMqttListener);
            }
            adapter.addTopics(topics, getQos(qosEnums));
            log.info("register mqtt listener {}", iMqttListener.getClass().getName());
        }
    }


    public int[] getQos(QosEnum[] qosEnums) {
        int[] result = new int[qosEnums.length];
        for (int i = 0; i < qosEnums.length; i++) {
            result[i] = qosEnums[i].getValue();
        }
        return result;
    }


    /**
     * 获取连接参数
     */
    @Bean
    public MqttConnectOptions getConsumerMqttConnectOptions() {
        return mqttConnectConfig.getMqttConnectOptions(mqttProperties);
    }

    /**
     * MQTT客户端
     */
    @Bean
    public MqttPahoClientFactory consumerMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getConsumerMqttConnectOptions());
        return factory;
    }

    /**
     * MQTT信息通道（消费者）
     */
    @Bean(name = CHANNEL_NAME_IN)
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息订阅绑定（消费者）
     */
    @Bean
    public MessageProducer inbound() {
        String clientId = mqttProperties.getConsumer().getClientId() + "-" + UUIDUtils.randomUUID();
        // 可以同时消费（订阅）多个Topic
        this.adapter = new MqttPahoMessageDrivenChannelAdapter(
                clientId, consumerMqttClientFactory(),
                StringUtils.split(mqttProperties.getConsumer().getDefaultTopic(), ","));
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());

        doListenerConfig(adapter);
        return adapter;
    }

    /**
     * MQTT消息处理器（消费者）
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
    public MessageHandler handler() {
        return message -> {
            String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
            String msg = message.getPayload().toString();
            IMqttListener iMqttListener = listenerMap.get(topic);
            if (iMqttListener == null) {
                log.warn("topic [{}] not find listener", topic);
                return;
            }
            iMqttListener.onMessage(topic, msg);
        };
    }
}
